這一篇我們要來講解 Spark 的運算機制,我本人是寫 Python的,所以下面都會使用 Python 來講解。
Spark 在執行的機制分為 Transformation 和 Action,Spark 執行的時候是使用一個叫 RDD(Resilient Distributed Dataset)的資料型態在做操作的,並且在做 transformation 是 lazy 的,所謂的 lazy 也就是當你呼叫有關 transformation 的 API 的時候,並不會馬上執行,僅僅紀錄 transformation 的轉換過程,而是要等到你呼叫 action 的 API 的時候,Spark 才會開始動作。
上面講完 Spark 的運作機制後,接著讓我們來小試一下身手,寫一個 wordcount,帶大家了解一下基本的寫法跟 transformation 和 action
首先先假定大家都有 Spark 環境,所以我就直接 import Spark。
from pyspark import SparkContext
def main():
with SparkContext(appName='wordcount') with sc:
# transformation
data = sc.textFile('/inpit/a.txt').map(lambda x: x[0])\
.flatMap(lambda x: x.split(' '))\
.map(lambda x: (x, 1)).reduceByKey(lambda a, b: (a+b))
# action
result = data.collect()
for (word, count) in result:
print("word:", word, "count:", count)
if __name__ == '__main__':
main()
上面是一個 wordcount 的小範例,從讀取檔案以及 map 的過程,還有最後的 reduceByKey,都是屬於 transformation,程式執行到這裡,Spark 只會記錄我們每一個對 RDD 的操作,並還沒有開始做運算,直到我寫了 collect(),Spark 就會開始往回找說這一個 RDD 是由前面哪一個操作產生出來的,一直持續往前找,直到找到最初的源頭,然後一口氣算完,這感覺有一點像遞迴。
以上是 Spark 的程式基本概念,詳細的 transformation 和 action 還有很多 API 可以用,可以多多上官網查詢,如果小弟的文章內容或觀念有誤,或是各位有更好的意見,歡迎在下面留言交流。
幫你補充一篇 Databricks 對於collect()
使用的注意事項:
假如您的RDD資料集過大且無法預期記憶體能完全容納這些資料,不要貿然使用
val values = myVeryLargeRDD.collect()
原因在於collect()
的運作過程中,將會複製應用程式中呼叫該方法的每一個RDD單元,結果將造成run out of memory
!
為了避免此一問題,在擷取資料時,可呼叫take()
及takeSample()
方法來擷取要使用的資料;又或者先透過filtering
或sampling
先行簡化RDD資料!
另外,countByKey()
、countByValue()
、collectAsMap()
等方法在使用上也需留意記憶體是否足夠!
最後,如果在記憶體不足下,仍然必需採用以上四種方法處理資料集,可先將資料集寫至硬碟上又或者直接匯出到資料庫儲存,再對硬碟上的檔案或者資料庫中的資料進行處理!
感謝補充!!